package com.tl.spark.scala

import java.util.Date

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, TableOutputFormat}
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

/**
  * @program: spark-test
  * @description: 批量数据写入hfile中然后 通过hbase的doBulkLoad方法批量快速导入
  * @author: dong.tl
  * @create: 2018-09-13 17:33
  **/
object ScalaDoBulkLoadHbase {
  def main(args: Array[String]): Unit = {

    //Hbase设置
    val conf = HBaseConfiguration.create()
    //设置zooKeeper集群地址，也可以通过将hbase-site.xml导入classpath，但是建议在程序里这样设置
    conf.set(HConstants.ZOOKEEPER_QUORUM, "db01,db02,db03")
    //设置zookeeper连接端口，默认2181
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    //设置读取的表
    //    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    //设置写入的表
    conf.set(TableOutputFormat.OUTPUT_TABLE, "parse_xml")


    //创建sparkConf
    val sparkConf = new SparkConf()
    //设置spark的任务名
    sparkConf.setAppName("read and write for hbase ")
    //设置master地址
    //sparkConf.setMaster("local")
    sparkConf.setMaster("spark://192.168.173.235:7077")
      .setJars(Seq("F:\\IdeaProjects\\spark-test\\target\\spark-test-1.0-SNAPSHOT.jar"))
    //创建spark上下文
    val sc = new SparkContext(sparkConf)


    //
//        var rdd2 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
//        rdd2.saveAsNewAPIHadoopFile("hdfs://db01:9000/test/wc/out-"+new Date().getTime,classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])


    //为job指定输出格式和输出表名
    val job = Job.getInstance(conf)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])


    val arr = ArrayBuffer(("A", " aaaa"), ("B", "bbbb"), ("C", "cccc"));
    arr.append(("EEEEE", "EEEEE"))


    var rdd1 = sc.makeRDD(arr).map(
      x => {
        var put = new Put(Bytes.toBytes(x._1))
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(x._2 + " 测试"))
        println(x._1 + ":" + x._2)
        val kv: KeyValue = new KeyValue(Bytes.toBytes(x._1), Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(x._2 + " 测试"));
        (new ImmutableBytesWritable, kv)
      }
    )



    //hbase的表名
    val tableName = "parse_xml"
    //创建hbase的链接,利用默认的配置文件,实际上读取的hbase的master地址
    val conn = ConnectionFactory.createConnection(conf)
    //根据表名获取表
    val table2: Table = conn.getTable(TableName.valueOf(tableName))
    //获取hbase表的region分布
    val regionLocator: RegionLocator = conn.getRegionLocator(TableName.valueOf(tableName))
    //    val table2 = new HTable(conf,tableName)
    HFileOutputFormat2.configureIncrementalLoadMap(job, table2.getDescriptor)

    val path = "hdfs://db03:9000/test/wc/out_"+new Date().getTime

    rdd1.saveAsNewAPIHadoopFile(path, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration())


    val bulkLoader = new LoadIncrementalHFiles(conf)
    try {

      bulkLoader.doBulkLoad(new Path(path), conn.getAdmin, table2, regionLocator)
    } finally {

    }


  }
}
